08d7125cd52d9eb33a6e8c27c2007193f83373ea,modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java,IndexersRouter,clusterChanged,#ClusterChangedEvent#,45

Before Change



    @Override public void clusterChanged(final ClusterChangedEvent event) {
        if (event.nodesChanged()) {
            indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
                @Override public IndexerClusterState execute(IndexerClusterState currentState) {
                    return null;  //To change body of implemented methods use File | Settings | File Templates.
                }
            });
        }
    }
}

After Change


            return;
        }
        if (event.nodesChanged() || event.metaDataChanged()) {
            indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
                @Override public IndexerClusterState execute(IndexerClusterState currentState) {
                    if (!event.state().metaData().hasIndex(indexerIndexName)) {
                        // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
                        if (!currentState.routing().isEmpty()) {
                            return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build();
                        }
                        return currentState;
                    }

                    IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing());
                    boolean dirty = false;

                    IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName);
                    // go over and create new indexer routing (with no node) for new types (indexers names)
                    for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
                        String mappingType = entry.getKey(); // mapping type is the name of the indexer
                        if (!currentState.routing().hasIndexerByName(mappingType)) {
                            // no indexer, we need to add it to the routing with no node allocation
                            try {
                                GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet();
                                if (getResponse.exists()) {
                                    String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);
                                    if (indexerType == null) {
                                        logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName);
                                    } else {
                                        routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null));
                                        dirty = true;
                                    }
                                }
                            } catch (Exception e) {
                                logger.warn("failed to get/parse _meta for [{}]", mappingType);
                            }
                        }
                    }
                    // now, remove routings that were deleted
                    for (IndexerRouting routing : currentState.routing()) {
                        if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) {
                            routingBuilder.remove(routing);
                            dirty = true;
                        }
                    }

                    // now, allocate indexers

                    // see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned
                    // but, we need to make sure that there will *be* next round of this is the logic


                    if (dirty) {
                        return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build();
                    }
                    return currentState;
                }
            });
        }
    }
}